// Copyright 2019-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// mqbc_incoreclusterstateledger.cpp                                  -*-C++-*-
#include <mqbc_incoreclusterstateledger.h>

#include <mqbscm_version.h>
// ----------------------------------------------------------------------------
//                                   NOTICE
//
// Strong consistency mode is neither implemented nor tested for this
// component.
// ----------------------------------------------------------------------------

// MQB
#include <mqbc_clusterstateledgerutil.h>
#include <mqbc_incoreclusterstateledgeriterator.h>
#include <mqbnet_cluster.h>
#include <mqbs_filestoreprotocol.h>
#include <mqbs_storageutil.h>
#include <mqbsi_log.h>
#include <mqbsl_ledger.h>
#include <mqbsl_memorymappedondisklog.h>
#include <mqbu_exit.h>
#include <mqbu_storagekey.h>

// BMQ
#include <bmqp_protocol.h>

// MWC
#include <mwcio_status.h>
#include <mwcsys_time.h>
#include <mwctsk_alarmlog.h>
#include <mwcu_blobobjectproxy.h>
#include <mwcu_memoutstream.h>

// BDE
#include <bdlbb_blob.h>
#include <bdlbb_blobutil.h>
#include <bdlde_md5.h>
#include <bdlf_bind.h>
#include <bdlf_placeholder.h>
#include <bdls_filesystemutil.h>
#include <bdlsb_memoutstreambuf.h>
#include <bdlt_currenttime.h>
#include <bdlt_datetimeutil.h>
#include <bdlt_epochutil.h>
#include <bsl_cstring.h>
#include <bsl_ctime.h>
#include <bsl_memory.h>
#include <bsl_unordered_set.h>
#include <bsl_utility.h>
#include <bsl_vector.h>
#include <bslmf_allocatorargt.h>
#include <bsls_annotation.h>
#include <bsls_keyword.h>
#include <bsls_types.h>

namespace BloombergLP {
namespace mqbc {

namespace {

const char k_FILE_PATTERN[] = "bmq_csl_*.bmq_csl";

/// Append the current date and time to the specified `result` in
/// YYYYMMDD_HHMMSS format.
void appendFormattedDatetime(bsl::string* result)
{
    // PRECONDITIONS
    BSLS_ASSERT_SAFE(result);

    enum {
        e_BUFFER_SIZE = 16  // includes null-character
    };

    bdlt::Datetime now = bdlt::CurrentTime::utc();
    char           buffer[e_BUFFER_SIZE];
    struct bsl::tm timeStruct = bdlt::DatetimeUtil::convertToTm(now);
    bsl::strftime(buffer, e_BUFFER_SIZE, "%G%m%d_%H%M%S", &timeStruct);
    result->append(buffer);
}

/// Return the current time in UTC
bsls::Types::Uint64 currentTime()
{
    return bdlt::EpochUtil::convertToTimeT64(bdlt::CurrentTime::utc());
}

/// Load into the specified `event` a bmqp::Event of type `e_CLUSTER_STATE`
/// containing the specified `record`.
void constructEventBlob(bdlbb::Blob* event, const bdlbb::Blob& record)
{
    // PRECONDITIONS
    BSLS_ASSERT_SAFE(event);

    bmqp::EventHeader eventHeader(bmqp::EventType::e_CLUSTER_STATE);
    eventHeader.setLength(sizeof(bmqp::EventHeader) + record.length());

    bdlbb::BlobUtil::append(event,
                            reinterpret_cast<char*>(&eventHeader),
                            sizeof(bmqp::EventHeader));
    bdlbb::BlobUtil::append(event, record);
}

}  // close anonymous namespace

// ============================================
// class IncoreClusterStateLeger_LogIdGenerator
// ============================================

class IncoreClusterStateLeger_LogIdGenerator : public mqbsi::LogIdGenerator {
  private:
    // DATA
    bsl::unordered_set<mqbu::StorageKey> d_logIds;
    // Set of log IDs already returned by
    // this object

  public:
    // CREATORS

    /// Create a `LogIdGenerator` object using the specified `allocator` to
    /// supply memory.
    IncoreClusterStateLeger_LogIdGenerator(bslma::Allocator* allocator);

    /// Destructor for this object.
    virtual ~IncoreClusterStateLeger_LogIdGenerator() BSLS_KEYWORD_OVERRIDE;

    // MANIPULATORS
    //   (virtual mqbsi::LogIdGenerator)

    /// Register the specified `logId` among those generated by this object
    /// and return `true` if successful, `false` otherwise (e.g., `logId` is
    /// already registered).  The effect of this is that `logId` will not be
    /// returned by a future call to `generateLogId(...)`.
    virtual bool
    registerLogId(const mqbu::StorageKey& logId) BSLS_KEYWORD_OVERRIDE;

    /// Create a new log name and a new unique log ID that has not before
    /// been generated by this object and load them into the specified
    /// `logName` and `logId`.
    virtual void generateLogId(bsl::string*      logName,
                               mqbu::StorageKey* logId) BSLS_KEYWORD_OVERRIDE;
};

// --------------------------------------------
// class IncoreClusterStateLeger_LogIdGenerator
// --------------------------------------------

// CREATORS
IncoreClusterStateLeger_LogIdGenerator::IncoreClusterStateLeger_LogIdGenerator(
    bslma::Allocator* allocator)
: d_logIds(allocator)
{
    // NOTHING
}

IncoreClusterStateLeger_LogIdGenerator ::
    ~IncoreClusterStateLeger_LogIdGenerator()
{
    // NOTHING
}

// MANIPULATORS
//   (virtual mqbsi::LogIdGenerator)
bool IncoreClusterStateLeger_LogIdGenerator::registerLogId(
    const mqbu::StorageKey& logId)
{
    return d_logIds.emplace(logId).second;
}

void IncoreClusterStateLeger_LogIdGenerator::generateLogId(
    bsl::string*      name,
    mqbu::StorageKey* logId)
{
    // PRECONDITIONS
    BSLS_ASSERT_SAFE(name);
    BSLS_ASSERT_SAFE(logId);

    // Create log name prefix: 'bmq_cs_YYYYMMDD_HHMMSS'
    bsl::string logName;
    logName.append("bmq_csl_");
    appendFormattedDatetime(&logName);

    mqbs::StorageUtil::generateStorageKey(logId, &d_logIds, *name);

    // Append logId to log name: 'bmq_cs_YYYYMMDD_HHMMSS_<LOG_ID>.bmq'
    char logIdStr[mqbu::StorageKey::e_KEY_LENGTH_HEX + 1];
    logId->loadHex(logIdStr);
    logIdStr[mqbu::StorageKey::e_KEY_LENGTH_HEX] = '\0';
    logName.append("_").append(logIdStr).append(".bmq_csl");
    name->assign(logName);
}

// ------------------------------
// class IncoreClusterStateLedger
// ------------------------------

// PRIVATE MANIPULATORS
int IncoreClusterStateLedger::cleanupLog(
    BSLS_ANNOTATION_UNUSED const bsl::string& logPath)
{
    // TODO: Implement

    return 0;
}

int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId,
                                              const mqbu::StorageKey& newLogId)
{
    // PRECONDITIONS
    BSLS_ASSERT_SAFE(!newLogId.isNull());

    enum RcEnum {
        // Value for the various RC error categories
        rc_SUCCESS = 0  // Success
        ,
        rc_WRITE_HEADER_FAILURE = -1  // Fail to write CSL file header to
                                      // ledger
        ,
        rc_CREATE_RECORD_FAILURE = -2  // Fail to create record
        ,
        rc_WRITE_RECORD_FAILURE = -3  // Fail to write record to ledger
    };

    BALL_LOG_INFO << description() << "Rolling over from log with logId ["
                  << oldLogId << "] to new log with logId [" << newLogId
                  << "]";

    int rc = ClusterStateLedgerUtil::writeFileHeader(d_ledger_mp.get(),
                                                     newLogId);
    if (rc != 0) {
        return 10 * rc + rc_WRITE_HEADER_FAILURE;  // RETURN
    }

    if (oldLogId.isNull()) {
        // If this is a brand new ledger
        return rc_SUCCESS;  // RETURN
    }

    if (bmqp_ctrlmsg::NodeStatus::E_AVAILABLE !=
        d_clusterData_p->membership().selfNodeStatus()) {
        // If self is not available, then the cluster state snapshot is likely
        // incomplete, so there is no point in writing it.
        return rc_SUCCESS;  // RETURN
    }

    // Determine the correct LSN to put in the cluster state snapshot, which is
    // the maximum of current LSN from ElectorInfo and the largest LSN from
    // uncommitted advisories.
    bmqp_ctrlmsg::LeaderMessageSequence snapshotLSN =
        d_clusterData_p->electorInfo().leaderMessageSequence();

    // Write uncommitted advisories into ledger
    for (AdvisoriesMapIter advisoryIt = d_uncommittedAdvisories.begin();
         advisoryIt != d_uncommittedAdvisories.end();
         ++advisoryIt) {
        ClusterMessageInfo& info = advisoryIt->second;

        bdlbb::Blob                  record(d_bufferFactory_p, d_allocator_p);
        ClusterStateRecordType::Enum recordType =
            info.d_clusterMessage.choice().isLeaderAdvisoryValue()
                ? ClusterStateRecordType::e_SNAPSHOT
                : ClusterStateRecordType::e_UPDATE;
        rc = ClusterStateLedgerUtil::appendRecord(&record,
                                                  info.d_clusterMessage,
                                                  advisoryIt->first,
                                                  currentTime(),
                                                  recordType);
        if (rc != 0) {
            return 10 * rc + rc_CREATE_RECORD_FAILURE;  // RETURN
        }

        rc = d_ledger_mp->writeRecord(&(info.d_recordId),
                                      record,
                                      mwcu::BlobPosition(),
                                      record.length());
        if (rc != 0) {
            return 10 * rc + rc_WRITE_RECORD_FAILURE;  // RETURN
        }

        // Increment snapshot LSN if needed
        if (advisoryIt->first > snapshotLSN) {
            snapshotLSN = advisoryIt->first;
        }
    }

    // Populate cluster state snapshot
    bmqp_ctrlmsg::LeaderAdvisory leaderAdvisory;
    leaderAdvisory.sequenceNumber() = snapshotLSN;

    bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo>& partitions =
        leaderAdvisory.partitions();
    for (int pid = 0;
         pid < static_cast<int>(d_clusterState_p->partitions().size());
         ++pid) {
        const ClusterStatePartitionInfo& pinfo = d_clusterState_p->partition(
            pid);
        if (pinfo.partitionId() == mqbs::DataStore::k_INVALID_PARTITION_ID) {
            continue;  // CONTINUE
        }
        bmqp_ctrlmsg::PartitionPrimaryInfo info;

        info.partitionId()    = pid;
        info.primaryNodeId()  = pinfo.primaryNode()
                                    ? pinfo.primaryNode()->nodeId()
                                    : mqbnet::Cluster::k_INVALID_NODE_ID;
        info.primaryLeaseId() = pinfo.primaryLeaseId();
        partitions.push_back(info);
    }

    for (mqbc::ClusterState::DomainStatesCIter domCit =
             d_clusterState_p->domainStates().cbegin();
         domCit != d_clusterState_p->domainStates().cend();
         ++domCit) {
        for (mqbc::ClusterState::UriToQueueInfoMapCIter citer =
                 domCit->second->queuesInfo().cbegin();
             citer != domCit->second->queuesInfo().cend();
             ++citer) {
            const mqbc::ClusterState::QueueInfoSp& infoSp = citer->second;

            bmqp_ctrlmsg::QueueInfo queueInfo;
            infoSp->key().loadBinary(&queueInfo.key());
            queueInfo.uri()         = infoSp->uri().asString();
            queueInfo.partitionId() = infoSp->partitionId();

            leaderAdvisory.queues().push_back(queueInfo);
        }
    }

    // Write snapshot into ledger
    ClusterMessageInfo info;
    info.d_clusterMessage.choice().makeLeaderAdvisory(leaderAdvisory);

    bdlbb::Blob record(d_bufferFactory_p, d_allocator_p);
    rc = ClusterStateLedgerUtil::appendRecord(
        &record,
        info.d_clusterMessage,
        leaderAdvisory.sequenceNumber(),
        currentTime(),
        ClusterStateRecordType::e_SNAPSHOT);
    if (rc != 0) {
        return 10 * rc + rc_CREATE_RECORD_FAILURE;  // RETURN
    }

    rc = d_ledger_mp->writeRecord(&(info.d_recordId),
                                  record,
                                  mwcu::BlobPosition(),
                                  record.length());
    if (rc != 0) {
        return 10 * rc + rc_WRITE_RECORD_FAILURE;  // RETURN
    }

    return rc_SUCCESS;
}

int IncoreClusterStateLedger::applyAdvisoryInternal(
    const bmqp_ctrlmsg::ClusterMessage&        clusterMessage,
    const bmqp_ctrlmsg::LeaderMessageSequence& sequenceNumber,
    ClusterStateRecordType::Enum               recordType)
{
    // TBD: What if leader is not active?
    // By default, a leader is passive, and transitions to active when it has
    // synced its cluster state with that of followers.  This responsibility is
    // currently managed by 'ClusterOrchestrator'. Will 'ClusterOrchestrator'
    // continue to carry out the responsibility? Initially, it should. Once
    // 'ClusterStateLedger' has been integrated in existing code base, we can
    // move that logic in it (we can also do that while integrating it, if its
    // not too much work).

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(!clusterMessage.choice().isUndefinedValue());

    enum RcEnum {
        // Value for the various RC error categories
        rc_SUCCESS = 0  // Success
        ,
        rc_ADVISORY_STALE = -1  // Advisory is stale
        ,
        rc_ADVISORY_ALREADY_APPLIED = -2  // Advisory was already applied
        ,
        rc_CREATE_RECORD_FAILURE = -3  // Fail to create advisory record
        ,
        rc_APPLY_RECORD_FAILURE = -4  // Fail to apply advisory record
    };

    if (sequenceNumber <
        d_clusterData_p->electorInfo().leaderMessageSequence()) {
        BALL_LOG_WARN << description()
                      << "Failed to apply advisory: " << clusterMessage
                      << ". Reason: advisory is stale (sequenceNumber: "
                      << sequenceNumber << ", leaderMessageSeq: "
                      << d_clusterData_p->electorInfo().leaderMessageSequence()
                      << ").";
        return rc_ADVISORY_STALE;  // RETURN
    }

    if (d_uncommittedAdvisories.find(sequenceNumber) !=
        d_uncommittedAdvisories.end()) {
        BALL_LOG_WARN << description()
                      << "Failed to apply advisory: " << clusterMessage
                      << ". Reason: advisory was already applied. ";
        return rc_ADVISORY_ALREADY_APPLIED;  // RETURN
    }

    // Do leader logic: apply and broadcast advisory, then apply its ack
    bdlbb::Blob advisoryRecord(d_bufferFactory_p, d_allocator_p);
    int         rc = ClusterStateLedgerUtil::appendRecord(&advisoryRecord,
                                                  clusterMessage,
                                                  sequenceNumber,
                                                  currentTime(),
                                                  recordType);
    if (rc != 0) {
        return 10 * rc + rc_CREATE_RECORD_FAILURE;  // RETURN
    }

    rc = applyRecordInternal(advisoryRecord,
                             0,
                             clusterMessage,
                             sequenceNumber,
                             recordType);
    if (rc != 0) {
        return 10 * rc + rc_APPLY_RECORD_FAILURE;  // RETURN
    }

    return rc_SUCCESS;  // RETURN
}

int IncoreClusterStateLedger::applyRecordInternal(
    const bdlbb::Blob&                         record,
    int                                        recordOffset,
    const mwcu::BlobPosition&                  recordPosition,
    const bmqp_ctrlmsg::ClusterMessage&        clusterMessage,
    const bmqp_ctrlmsg::LeaderMessageSequence& sequenceNumber,
    ClusterStateRecordType::Enum               recordType)
{
    enum RcEnum {
        // Value for the various RC error categories
        rc_SUCCESS = 0  // Success
        ,
        rc_UNKNOWN = -1  // Unknown result
        ,
        rc_WRITE_FAILURE = -2  // Fail to write record to ledger
        ,
        rc_CREATE_ACK_FAILURE = -3  // Fail to create leader advisory ack
        ,
        rc_CREATE_COMMIT_FAILURE = -4  // Fail to create leader advisory
                                       // commit
        ,
        rc_APPLY_ACK_FAILURE = -5  // Fail to apply leader advisory ack
        ,
        rc_APPLY_COMMIT_FAILURE = -6  // Fail to apply leader advisory commit
        ,
        rc_SEND_ACK_FAILURE = -7  // Fail to send leader advisory ack
                                  // back to leader
        ,
        rc_ADVISORY_NOT_FOUND = -8  // Advisory not found
    };

    int rc = rc_UNKNOWN;
    switch (recordType) {
    case (ClusterStateRecordType::e_SNAPSHOT):
    case (ClusterStateRecordType::e_UPDATE): {
        mqbsi::LedgerRecordId recordId;
        rc = d_ledger_mp->writeRecord(&recordId,
                                      record,
                                      recordPosition,
                                      record.length() - recordOffset);
        if (rc != 0) {
            BALL_LOG_ERROR << description()
                           << "Failed to write record. [reason: write "
                           << "failure, record type: " << recordType
                           << ", rc: " << rc << "]";
            return rc * 10 + rc_WRITE_FAILURE;  // RETURN
        }

        ClusterMessageInfo info;
        info.d_clusterMessage = clusterMessage;
        d_uncommittedAdvisories.insert(bsl::make_pair(sequenceNumber, info));

        if (isSelfLeader()) {
            // TBD: How/if to handle timeout for this message?
            //      Option 1: Schedule internal timeout event for this message,
            //                then add advisory to list of timed out messages
            //                if quorum number of ACKs has not been received
            //      Option 2: Leverage RequestManager or MultiRequestManager to
            //                send the message, in response callback handle
            //                timeout by adding advisory to list of timed out
            //                messages if quorum number of ACKs has not been
            //                received
            //      etc.

            // If self is leader who initiated a snapshot or update record, the
            // record offset should be 0.
            BSLS_ASSERT_SAFE(recordOffset == 0);

            bdlbb::Blob advisoryEvent(d_bufferFactory_p, d_allocator_p);
            constructEventBlob(&advisoryEvent, record);
            d_clusterData_p->membership().netCluster()->writeAll(
                advisoryEvent,
                bmqp::EventType::e_CLUSTER_STATE);

            BALL_LOG_INFO << "Broadcasted message '" << clusterMessage
                          << "' to all cluster nodes";
        }

        // A follower does not reply Ack under eventual consistency
        if (!isSelfLeader() &&
            d_consistencyLevel == ClusterStateLedgerConsistency::e_EVENTUAL) {
            return rc_SUCCESS;  // RETURN
        }

        bmqp_ctrlmsg::ClusterMessage ackMessage;
        ackMessage.choice().makeLeaderAdvisoryAck().sequenceNumberAcked() =
            sequenceNumber;

        bdlbb::Blob ackRecord(d_bufferFactory_p, d_allocator_p);
        rc = ClusterStateLedgerUtil::appendRecord(
            &ackRecord,
            ackMessage,
            sequenceNumber,
            currentTime(),
            ClusterStateRecordType::e_ACK);
        if (rc != 0) {
            return 10 * rc + rc_CREATE_ACK_FAILURE;  // RETURN
        }

        // If leader, apply Ack to self.  Else, reply Ack back to leader
        if (isSelfLeader()) {
            rc = applyRecordInternal(ackRecord,
                                     0,
                                     ackMessage,
                                     sequenceNumber,
                                     ClusterStateRecordType::e_ACK);
            if (rc != 0) {
                return 10 * rc + rc_APPLY_ACK_FAILURE;  // RETURN
            }
        }
        else {
            bdlbb::Blob ackEvent(d_bufferFactory_p, d_allocator_p);
            constructEventBlob(&ackEvent, ackRecord);

            mqbnet::ClusterNode* leaderNode =
                d_clusterData_p->electorInfo().leaderNode();
            bmqt::GenericResult::Enum writeRc =
                leaderNode->write(ackEvent, bmqp::EventType::e_CLUSTER_STATE);

            if (bmqt::GenericResult::e_SUCCESS != writeRc) {
                BALL_LOG_ERROR << "#CLUSTER_SEND_FAILURE "
                               << "Failed to send ack: " << ackMessage
                               << " back to leader node "
                               << leaderNode->nodeDescription()
                               << ", rc: " << writeRc;

                return writeRc * 10 + rc_SEND_ACK_FAILURE;  // RETURN
            }
            else {
                BALL_LOG_INFO << "Sent ack '" << ackMessage
                              << "' back to leader node "
                              << leaderNode->nodeDescription();
            }
        }
    } break;  // BREAK
    case (ClusterStateRecordType::e_COMMIT): {
        // PRECONDITIONS
        BSLS_ASSERT_SAFE(d_commitCb);
        BSLS_ASSERT_SAFE(
            clusterMessage.choice().isLeaderAdvisoryCommitValue());

        const bmqp_ctrlmsg::LeaderAdvisoryCommit& commit =
            clusterMessage.choice().leaderAdvisoryCommit();

        AdvisoriesMapIter iter = d_uncommittedAdvisories.find(
            commit.sequenceNumberCommitted());
        if (iter == d_uncommittedAdvisories.end()) {
            BALL_LOG_ERROR << description()
                           << "Failed to apply 'LeaderAdvisoryCommit': "
                           << commit
                           << ". Reason: associated advisory not found. ";
            return rc_ADVISORY_NOT_FOUND;  // RETURN
        }

        // Write record to ledger
        mqbsi::LedgerRecordId recordId;
        rc = d_ledger_mp->writeRecord(&recordId,
                                      record,
                                      recordPosition,
                                      record.length() - recordOffset);
        if (rc != 0) {
            BALL_LOG_ERROR << description()
                           << "Failed to write record. [reason: write "
                           << "failure, record type: " << recordType
                           << ", rc: " << rc << "]";
            return rc * 10 + rc_WRITE_FAILURE;  // RETURN
        }

        if (isSelfLeader()) {
            bdlbb::Blob commitEvent(d_bufferFactory_p, d_allocator_p);
            constructEventBlob(&commitEvent, record);
            d_clusterData_p->membership().netCluster()->writeAll(
                commitEvent,
                bmqp::EventType::e_CLUSTER_STATE);

            BALL_LOG_INFO << "Broadcasted commit message '" << clusterMessage
                          << "' to all cluster nodes";
        }

        // Enqueue commit callback invocation on cluster dispatcher thread
        bmqp_ctrlmsg::ControlMessage committedControlMessage;
        committedControlMessage.choice().makeClusterMessage(
            iter->second.d_clusterMessage);
        // NOTE: For now, the commit callback is invoked in place to reduce
        //       state inconsistencies resulting from thread race conditions
        //       while transitioning to using IncoreCSL.  Once transitioned,
        //       the commit callback should be enqueued to be invoked from the
        //       cluster dispatcher thread.
        // TODO: In phase 2 of IncoreCSL, this can return to enqueueing on the
        //       cluster dispatcher thread
        d_commitCb(committedControlMessage,
                   ClusterStateLedgerCommitStatus::e_SUCCESS);

        d_uncommittedAdvisories.erase(commit.sequenceNumberCommitted());
    } break;  // BREAK
    case (ClusterStateRecordType::e_ACK): {
        // PRECONDITIONS
        BSLS_ASSERT_SAFE(isSelfLeader());
        BSLS_ASSERT_SAFE(clusterMessage.choice().isLeaderAdvisoryAckValue());

        const bmqp_ctrlmsg::LeaderAdvisoryAck& ack =
            clusterMessage.choice().leaderAdvisoryAck();

        AdvisoriesMapIter iter = d_uncommittedAdvisories.find(
            ack.sequenceNumberAcked());
        if (iter == d_uncommittedAdvisories.end()) {
            BALL_LOG_ERROR << description()
                           << "Failed to apply 'LeaderAdvisoryAck': " << ack
                           << ". Reason: associated advisory not found. ";
            return rc_ADVISORY_NOT_FOUND;  // RETURN
        }

        iter->second.d_ackCount += 1;
        if (iter->second.d_ackCount == d_ackQuorum) {
            // Consistency level reached. Apply a commit message for the
            // advisory, broadcast it, and invoke the 'CommitCb'.
            bmqp_ctrlmsg::ClusterMessage        commitMessage;
            bmqp_ctrlmsg::LeaderAdvisoryCommit& commitAdvisory =
                commitMessage.choice().makeLeaderAdvisoryCommit();
            d_clusterData_p->electorInfo().nextLeaderMessageSequence(
                &commitAdvisory.sequenceNumber());
            commitAdvisory.sequenceNumberCommitted() =
                ack.sequenceNumberAcked();

            bdlbb::Blob commitRecord(d_bufferFactory_p, d_allocator_p);
            rc = ClusterStateLedgerUtil::appendRecord(
                &commitRecord,
                commitMessage,
                commitAdvisory.sequenceNumber(),
                currentTime(),
                ClusterStateRecordType::e_COMMIT);
            if (rc != 0) {
                return 10 * rc + rc_CREATE_COMMIT_FAILURE;  // RETURN
            }

            rc = applyRecordInternal(commitRecord,
                                     0,
                                     commitMessage,
                                     commitAdvisory.sequenceNumber(),
                                     ClusterStateRecordType::e_COMMIT);
            if (rc != 0) {
                return 10 * rc + rc_APPLY_COMMIT_FAILURE;  // RETURN
            }
        }
    } break;  // BREAK
    case ClusterStateRecordType::e_UNDEFINED:
    default: {
        // We should never be here
        BSLS_ASSERT_SAFE(
            false &&
            "IncoreClusterStateLedger apply with invalid record type");
    }
    }

    return rc_SUCCESS;
}

int IncoreClusterStateLedger::applyRecordInternal(
    const bdlbb::Blob&                         record,
    int                                        recordOffset,
    const bmqp_ctrlmsg::ClusterMessage&        clusterMessage,
    const bmqp_ctrlmsg::LeaderMessageSequence& sequenceNumber,
    ClusterStateRecordType::Enum               recordType)
{
    mwcu::BlobPosition recordPosition;
    int                rc = mwcu::BlobUtil::findOffsetSafe(&recordPosition,
                                            record,
                                            recordOffset);
    BSLS_ASSERT_SAFE(rc == 0);

    return applyRecordInternal(record,
                               recordOffset,
                               recordPosition,
                               clusterMessage,
                               sequenceNumber,
                               recordType);
}

int IncoreClusterStateLedger::applyRecordInternal(
    const bdlbb::Blob&                         record,
    const mwcu::BlobPosition&                  recordPosition,
    const bmqp_ctrlmsg::ClusterMessage&        clusterMessage,
    const bmqp_ctrlmsg::LeaderMessageSequence& sequenceNumber,
    ClusterStateRecordType::Enum               recordType)
{
    int recordOffset = 0;
    int rc           = mwcu::BlobUtil::positionToOffsetSafe(&recordOffset,
                                                  record,
                                                  recordPosition);
    BSLS_ASSERT_SAFE(rc == 0);

    return applyRecordInternal(record,
                               recordOffset,
                               recordPosition,
                               clusterMessage,
                               sequenceNumber,
                               recordType);
}

void IncoreClusterStateLedger::cancelUncommittedAdvisories()
{
    if (isSelfLeader()) {
        for (AdvisoriesMapIter iter = d_uncommittedAdvisories.begin();
             iter != d_uncommittedAdvisories.end();
             ++iter) {
            const ClusterMessageInfo&    info = iter->second;
            bmqp_ctrlmsg::ControlMessage controlMessage;
            controlMessage.choice().makeClusterMessage(info.d_clusterMessage);
            // TBD: We may want to consider firing the commitCb later in
            //      the dispatcher thread, just like we do in the other
            //      case.
            d_commitCb(controlMessage,
                       ClusterStateLedgerCommitStatus::e_CANCELED);
        }
    }
    d_uncommittedAdvisories.clear();
}

int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob&   event,
                                        mqbnet::ClusterNode* source,
                                        bool                 delayed)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));
    BSLS_ASSERT_SAFE(source);
    BSLS_ASSERT_SAFE(source->nodeId() !=
                     d_clusterData_p->membership().selfNode()->nodeId());
    BSLS_ASSERT_SAFE(
        isSelfLeader() ||
        source->nodeId() ==
            d_clusterData_p->electorInfo().leaderNode()->nodeId());

    enum RcEnum {
        // Value for the various RC error categories
        rc_SUCCESS = 0  // Success
        ,
        rc_MISSING_HEADER = -1  // Event or record header is missing
        ,
        rc_INVALID_HEADER = -2  // Event or record header is invalid
        ,
        rc_INVALID_SOURCE = -3  // Source is not leader
        ,
        rc_RECORD_STALE = -4  // Record is stale
        ,
        rc_RECORD_ALREADY_APPLIED = -5  // Record was already applied
        ,
        rc_LOAD_MESSAGE_FAILURE = -6  // Fail to load cluster message
        ,
        rc_NODE_STOPPING = -7  // Self node is stopping
        ,
        rc_ADVISORY_INVALID = -8  // Advisory is invalid, as determined
                                  // by type-specific validation
        ,
        rc_APPLY_RECORD_FAILURE = -9  // Fail to apply record
    };

    BALL_LOG_INFO << "Applying cluster state record event from node '"
                  << source->nodeDescription() << "'";

    // Sanity check on record
    mwcu::BlobObjectProxy<bmqp::EventHeader> eventHeader(
        &event,
        -bmqp::EventHeader::k_MIN_HEADER_SIZE,
        true,    // read
        false);  // write
    if (!eventHeader.isSet()) {
        return rc_MISSING_HEADER;  // RETURN
    }

    const int eventHeaderSize = eventHeader->headerWords() *
                                bmqp::Protocol::k_WORD_SIZE;
    if (eventHeaderSize < bmqp::EventHeader::k_MIN_HEADER_SIZE) {
        return rc_INVALID_HEADER;  // RETURN
    }
    eventHeader.resize(eventHeaderSize);
    if (!eventHeader.isSet()) {
        return rc_MISSING_HEADER;  // RETURN
    }
    BSLS_ASSERT_SAFE(eventHeader->length() == event.length());
    BSLS_ASSERT_SAFE(eventHeader->type() == bmqp::EventType::e_CLUSTER_STATE);

    mwcu::BlobPosition recordHeaderPosition;
    int rc = mwcu::BlobUtil::findOffsetSafe(&recordHeaderPosition,
                                            event,
                                            eventHeaderSize);
    if (rc != 0) {
        return rc_MISSING_HEADER;  // RETURN
    }

    mwcu::BlobObjectProxy<ClusterStateRecordHeader> recordHeader(
        &event,
        recordHeaderPosition,
        -ClusterStateRecordHeader::k_HEADER_NUM_WORDS,
        true,    // read
        false);  // write
    if (!recordHeader.isSet()) {
        return rc_MISSING_HEADER;  // RETURN
    }
    recordHeader.resize(recordHeader->headerWords() *
                        bmqp::Protocol::k_WORD_SIZE);
    if (!recordHeader.isSet()) {
        return rc_MISSING_HEADER;  // RETURN
    }

    rc = ClusterStateLedgerUtil::validateRecordHeader(*recordHeader);
    if (rc != ClusterStateLedgerUtilRc::e_SUCCESS) {
        BALL_LOG_ERROR << description() << "Failed to apply record from '"
                       << source->nodeDescription()
                       << "'. Reason: invalid header.";
        return rc * 10 + rc_INVALID_HEADER;  // RETURN
    }
    BSLS_ASSERT_SAFE(ClusterStateLedgerUtil::recordSize(*recordHeader) ==
                     (event.length() - eventHeaderSize));

    bmqp_ctrlmsg::LeaderMessageSequence seqNum;
    seqNum.electorTerm()    = recordHeader->electorTerm();
    seqNum.sequenceNumber() = recordHeader->sequenceNumber();
    if (recordHeader->recordType() != ClusterStateRecordType::e_ACK &&
        d_uncommittedAdvisories.find(seqNum) !=
            d_uncommittedAdvisories.end()) {
        BALL_LOG_ERROR << description() << "Failed to apply record from '"
                       << source->nodeDescription()
                       << "'. Reason: record was already applied. ";
        return rc_RECORD_ALREADY_APPLIED;  // RETURN
    }

    // Validate advisory and source
    if (!delayed) {
        // Source (leader) and leader sequence number should not be validated
        // for delayed (aka buffered) advisories.  Those attributes were
        // validated when buffered advisories were received.

        if (d_clusterData_p->electorInfo().leaderNode() != source) {
            // Different leader.  Ignore message.
            BALL_LOG_ERROR << description() << "Ignoring event from '"
                           << source->nodeDescription()
                           << "'. Reason: Source node is not the leader"
                           << " [source: " << source->nodeDescription()
                           << ", leader: "
                           << (d_clusterData_p->electorInfo().leaderNode()
                                   ? d_clusterData_p->electorInfo()
                                         .leaderNode()
                                         ->nodeDescription()
                                   : "** none **")
                           << "].";
            return rc_INVALID_SOURCE;  // RETURN
        }

        if (seqNum < d_clusterData_p->electorInfo().leaderMessageSequence()) {
            BALL_LOG_ERROR
                << description() << "Failed to apply record from '"
                << source->nodeDescription() << "'. Reason: record is stale "
                << "[sequenceNumber: " << seqNum << ", leaderMessageSeq: "
                << d_clusterData_p->electorInfo().leaderMessageSequence()
                << "].";
            return rc_RECORD_STALE;  // RETURN
        }

        // Leader status and sequence number are updated unconditionally.  It
        // may have been updated by one of the callers of this routine, but
        // there is no harm is setting these values again.
        if (d_clusterData_p->clusterConfig()
                .clusterAttributes()
                .isCSLModeEnabled()) {
            d_clusterData_p->electorInfo().setLeaderMessageSequence(seqNum);
            d_clusterData_p->electorInfo().setLeaderStatus(
                mqbc::ElectorInfoLeaderStatus::e_ACTIVE);
        }
    }

    // Load cluster message from record
    bmqp_ctrlmsg::ClusterMessage message;
    rc = ClusterStateLedgerUtil::loadClusterMessage(&message,
                                                    *recordHeader,
                                                    event,
                                                    eventHeaderSize);
    if (rc != 0) {
        return rc * 10 + rc_LOAD_MESSAGE_FAILURE;  // RETURN
    }

    // CONDITIONS: If advisory is delayed, it must be a queue advisory (only
    //             those are buffered) or a commit of queue advisory
    BSLS_ASSERT_SAFE(!delayed ||
                     message.choice().isQueueAssignmentAdvisoryValue() ||
                     message.choice().isQueueUnassignedAdvisoryValue() ||
                     message.choice().isQueueUnAssignmentAdvisoryValue() ||
                     message.choice().isLeaderAdvisoryCommitValue());

    // More validations (type-specific): A queue advisory
    if (message.choice().isQueueAssignmentAdvisoryValue() ||
        message.choice().isQueueUnassignedAdvisoryValue() ||
        message.choice().isQueueUnAssignmentAdvisoryValue()) {
        // Queue advisory

        // TBD: Suppress the following check for now, which will help some
        // integration tests to pass.  At this point, it is not clear if it is
        // safe to process queue advisory messages self is stopping.
        //
        // if (   d_clusterData_p->membership().selfNodeStatus()
        //     == bmqp_ctrlmsg::NodeStatus::E_STOPPING) {
        //     // No need to process the advisory since self is stopping.
        //     BALL_LOG_INFO << description()
        //                   << "Ignoring event from '"
        //                   << source->nodeDescription()
        //                   << "'. Reason: Self is stopping.";
        //     return rc_NODE_STOPPING;                               // RETURN
        // }
    }
    else if (message.choice().isPartitionPrimaryAdvisoryValue()) {
        const bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo>& partitions =
            message.choice().partitionPrimaryAdvisory().partitions();
        // Validate the notification.  If *any* part of notification is found
        // invalid, we reject the *entire* notification.
        for (int i = 0; i < static_cast<int>(partitions.size()); ++i) {
            const bmqp_ctrlmsg::PartitionPrimaryInfo& info = partitions[i];
            if (info.partitionId() >=
                static_cast<int>(d_clusterState_p->partitions().size())) {
                MWCTSK_ALARMLOG_ALARM("CLUSTER")
                    << d_clusterData_p->identity().description()
                    << ": Invalid partitionId: " << info
                    << " specified in partition-primary advisory. "
                    << "Ignoring this *ENTIRE* advisory message."
                    << MWCTSK_ALARMLOG_END;
                return rc_ADVISORY_INVALID;  // RETURN
            }

            mqbnet::ClusterNode* proposedPrimaryNode =
                d_clusterData_p->membership().netCluster()->lookupNode(
                    info.primaryNodeId());
            if (proposedPrimaryNode == 0) {
                MWCTSK_ALARMLOG_ALARM("CLUSTER")
                    << d_clusterData_p->identity().description()
                    << ": Invalid primaryNodeId: " << info
                    << " specified in partition-primary advisory."
                    << " Ignoring this *ENTIRE* advisory."
                    << MWCTSK_ALARMLOG_END;
                return rc_ADVISORY_INVALID;  // RETURN
            }

            if (proposedPrimaryNode ==
                    d_clusterData_p->membership().selfNode() &&
                bmqp_ctrlmsg::NodeStatus::E_STARTING ==
                    d_clusterData_p->membership().selfNodeStatus()) {
                // Self is the proposed primary but self is STARTING.  This is
                // a bug because if this node perceives self as STARTING, any
                // other node (including the leader) *cannot* perceive this
                // node as AVAILABLE.  This node might be STOPPING, but that's
                // ok since its possible that it transitioned from AVAILABLE to
                // other state immediately after leader broadcast the advisory.
                // Lower layers will take care of that scenario.

                MWCTSK_ALARMLOG_ALARM("CLUSTER")
                    << d_clusterData_p->identity().description()
                    << ": proposed primary specified in partition/primary : "
                    << info << " is self but self is STARTING. "
                    << "Ignoring this *ENTIRE* advisory."
                    << MWCTSK_ALARMLOG_END;
                return rc_ADVISORY_INVALID;  // RETURN
            }

            const ClusterStatePartitionInfo& pi = d_clusterState_p->partition(
                info.partitionId());
            if (d_clusterData_p->membership().selfNode() !=
                    proposedPrimaryNode &&
                d_clusterData_p->membership().selfNode() == pi.primaryNode() &&
                bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == pi.primaryStatus() &&
                bmqp_ctrlmsg::NodeStatus::E_AVAILABLE ==
                    d_clusterData_p->membership().selfNodeStatus()) {
                // Self node is available, and views self as active primary of
                // this partition, but has received an advisory from the leader
                // indicating that a different node is the primary for this
                // partition.  This downgrade scenario (primary -> replica) is
                // currently not supported, so self node will exit.  Note that
                // this scnenario can be witnessed in a bad network where some
                // nodes cannot see other nodes intermittently.  See
                // 'onLeaderSyncDataQueryResponseDispatched' for similar check.
                MWCTSK_ALARMLOG_ALARM("CLUSTER")
                    << d_clusterData_p->identity().description()
                    << " PartitionId [" << info.partitionId()
                    << "]: self node views self as active/available primary, "
                    << "but a different node is proposed as primary in the "
                    << "partition/primary mapping: " << info << ". This "
                    << "downgrade from primary to replica is currently not "
                    << "supported, and self node will exit."
                    << MWCTSK_ALARMLOG_END;

                mqbu::ExitUtil::terminate(
                    mqbu::ExitCode::e_UNSUPPORTED_SCENARIO);
                // EXIT
            }

            if (d_isFirstLeaderAdvisory) {
                // If this node just started and recovered from a peer, it will
                // already be aware of the *current* primaryLeaseId for each
                // partition, but not its primary node (this is because leaseId
                // is retrieved from the storage, but not the primary nodeId,
                // because we don't persist the primary nodeId).  When this
                // node becomes AVAILABLE, the leader simply sends the
                // partition/primary advisory without bumping up the leaseId.
                // 'd_isFirstLeaderAdvisory' flag takes care of this scenario.

                // Note that 'pi.primaryNode()' may not be zero because
                // currently, we update the cluster state even upon receiving
                // primary status advisory from a primary node, so self node
                // may or may not have received a primary-status advisory from
                // the primary node.

                if (info.primaryLeaseId() < pi.primaryLeaseId()) {
                    MWCTSK_ALARMLOG_ALARM("CLUSTER")
                        << d_clusterData_p->identity().description()
                        << ": Stale primaryLeaseId specified in: " << info
                        << ", current primaryLeaseId: " << pi.primaryLeaseId()
                        << ". Ignoring this *ENTIRE* advisory."
                        << MWCTSK_ALARMLOG_END;
                    return rc_ADVISORY_INVALID;  // RETURN
                }
            }
            else {
                if ((pi.primaryNode() == proposedPrimaryNode) ||
                    (pi.primaryNode() == 0)) {
                    // Proposed primary node is same as self's primary node, or
                    // self views this partition as orphan.  In either case,
                    // leaseId cannot be smaller.  It can, however, be equal.
                    // The case in which 'pi.primaryNode() ==
                    // proposedPrimaryNode' and leaseId is same, is obvious --
                    // the leader simply re-sent the partition primary mapping
                    // advisory.  But the case where pi.primaryNode() is null
                    // (and 'proposedPrimaryNode' is valid) *and* leaseId is
                    // same can be explained in this way: this node (replica)
                    // was aware of the primary and leaseId, but then at some
                    // point, lost connection to the primary, and marked this
                    // partition as orphan.  Note that primary node did not
                    // crash.  After some time, connection was re-established,
                    // and leader/primary resent the primary mapping again --
                    // with same leaseId.  This scenario was seen when cluster
                    // was running on VM boxes.  Also note that above scenario
                    // is different from the case where a node has not heard
                    // from leader even once.

                    if (info.primaryLeaseId() < pi.primaryLeaseId()) {
                        MWCTSK_ALARMLOG_ALARM("CLUSTER")
                            << d_clusterData_p->identity().description()
                            << ": Stale primaryLeaseId specified "
                            << "in: " << info << ", current primaryLeaseId: "
                            << pi.primaryLeaseId()
                            << ". Primary node viewed by self: "
                            << (pi.primaryNode() != 0
                                    ? pi.primaryNode()->nodeDescription()
                                    : "** null **")
                            << ", proposed primary node: "
                            << proposedPrimaryNode->nodeDescription()
                            << ". Ignoring this *ENTIRE* advisory."
                            << MWCTSK_ALARMLOG_END;
                        return rc_ADVISORY_INVALID;  // RETURN
                    }
                }
                else {
                    // Different (non-zero) primary nodes.  Proposed leaseId
                    // must be greater.

                    if (info.primaryLeaseId() <= pi.primaryLeaseId()) {
                        MWCTSK_ALARMLOG_ALARM("CLUSTER")
                            << d_clusterData_p->identity().description()
                            << ": Stale primaryLeaseId specified in: " << info
                            << ", current primaryLeaseId: "
                            << pi.primaryLeaseId()
                            << ". Ignoring this *ENTIRE* advisory."
                            << MWCTSK_ALARMLOG_END;
                        return rc_ADVISORY_INVALID;  // RETURN
                    }
                }
            }
        }
    }

    BALL_LOG_INFO << description() << " Applying cluster message from '"
                  << source->nodeDescription() << "': " << message;

    rc = applyRecordInternal(event,
                             eventHeaderSize,
                             recordHeaderPosition,
                             message,
                             seqNum,
                             recordHeader->recordType());
    if (rc != 0) {
        BALL_LOG_WARN << description() << "Failed to apply record from '"
                      << source->nodeDescription()
                      << "' [sequenceNumber: " << seqNum
                      << ", leaderMessageSeq: "
                      << d_clusterData_p->electorInfo().leaderMessageSequence()
                      << "]. rc: " << rc;
        return rc * 10 + rc_APPLY_RECORD_FAILURE;  // RETURN
    }

    return rc_SUCCESS;
}

// CREATORS
IncoreClusterStateLedger::IncoreClusterStateLedger(
    const mqbcfg::ClusterDefinition&    clusterDefinition,
    ClusterStateLedgerConsistency::Enum consistencyLevel,
    ClusterData*                        clusterData,
    ClusterState*                       clusterState,
    bdlbb::BlobBufferFactory*           bufferFactory,
    bslma::Allocator*                   allocator)
: d_allocator_p(allocator)
, d_isFirstLeaderAdvisory(true)
, d_isOpen(false)
, d_bufferFactory_p(bufferFactory)
, d_description(allocator)
, d_commitCb()
, d_clusterData_p(clusterData)
, d_clusterState_p(clusterState)
, d_consistencyLevel(consistencyLevel)
, d_ackQuorum(consistencyLevel == ClusterStateLedgerConsistency::e_STRONG
                  ? (clusterDefinition.nodes().size() / 2) + 1
                  : 1)
, d_ledgerConfig(allocator)
, d_ledger_mp(0)
, d_uncommittedAdvisories(allocator)
{
    // PRECONDITIONS
    BSLS_ASSERT_SAFE(clusterState);

    // Create description
    mwcu::MemOutStream osstr;
    osstr << "IncoreClusterStateLedger (cluster: "
          << d_clusterData_p->identity().name() << ") : ";
    d_description.assign(osstr.str().data(), osstr.str().length());

    // Instantiate ledger config
    bsl::shared_ptr<mqbsi::LogIdGenerator> logIdGenerator(
        new (*d_allocator_p)
            IncoreClusterStateLeger_LogIdGenerator(d_allocator_p),
        d_allocator_p);

    bsl::shared_ptr<mqbsi::LogFactory> logFactory(
        new (*d_allocator_p)
            mqbsl::MemoryMappedOnDiskLogFactory(d_allocator_p),
        d_allocator_p);

    const mqbcfg::PartitionConfig& partitionCfg =
        clusterDefinition.partitionConfig();
    d_ledgerConfig.setLocation(partitionCfg.location())
        .setPattern(k_FILE_PATTERN)
        .setMaxLogSize(partitionCfg.maxQlistFileSize())
        .setReserveOnDisk(partitionCfg.preallocate())
        .setPrefaultPages(partitionCfg.prefaultPages())
        .setLogIdGenerator(logIdGenerator)
        .setLogFactory(logFactory)
        .setExtractLogIdCallback(ClusterStateLedgerUtil::extractLogId)
        .setValidateLogCallback(ClusterStateLedgerUtil::validateLog)
        .setRolloverCallback(
            bdlf::BindUtil::bind(&IncoreClusterStateLedger::onLogRolloverCb,
                                 this,
                                 bdlf::PlaceHolders::_1,   // oldLogId
                                 bdlf::PlaceHolders::_2))  // newLogId
        .setCleanupCallback(
            bdlf::BindUtil::bind(&IncoreClusterStateLedger::cleanupLog,
                                 this,
                                 bdlf::PlaceHolders::_1));  // logPath
}

IncoreClusterStateLedger::~IncoreClusterStateLedger()
{
    // NOTHING
}

// MANIPULATORS
//   (virtual mqbc::ElectorInfoObserver)
void IncoreClusterStateLedger::onClusterLeader(
    BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* node,
    ElectorInfoLeaderStatus::Enum               status)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));

    if (status == ElectorInfoLeaderStatus::e_PASSIVE) {
        return;  // RETURN
    }

    cancelUncommittedAdvisories();
}

// MANIPULATORS
//   (virtual mqbc::ClusterStateLedger)
int IncoreClusterStateLedger::open()
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));

    BALL_LOG_INFO << description()
                  << ": Opening IncoreCSL with config: " << d_ledgerConfig;

    enum RcEnum {
        // Value for the various RC error categories
        rc_SUCCESS = 0  // Success
        ,
        rc_ALREADY_OPENED = -1  // CSL Already opened
        ,
        rc_OPEN_FAILURE = -2  // Failure to open ledger
        ,
        rc_INTERNAL_LEDGER_ERROR = -3  // Internal ledger error
    };

    if (d_isOpen) {
        return rc_ALREADY_OPENED;  // RETURN
    }

    // Create and open the ledger
    d_ledger_mp.load(new (*d_allocator_p)
                         mqbsl::Ledger(d_ledgerConfig, d_allocator_p),
                     d_allocator_p);
    int rc = d_ledger_mp->open(mqbsl::Ledger::e_CREATE_IF_MISSING);
    if (rc != 0) {
        return 10 * rc + rc_OPEN_FAILURE;  // RETURN
    }

    // Iterator through the records to calculate the correct outstanding num
    // bytes and write offset.
    IncoreClusterStateLedgerIterator cslIter(d_ledger_mp.get());
    while (cslIter.next() == 0) {
        continue;
    }
    rc = d_ledger_mp->setOutstandingNumBytes(cslIter.currRecordId().logId(),
                                             cslIter.currRecordId().offset());
    if (rc != 0) {
        return 10 * rc + rc_INTERNAL_LEDGER_ERROR;  // RETURN
    }
    rc = d_ledger_mp->currentLog()->seek(cslIter.currRecordId().offset());
    if (rc != 0) {
        return 10 * rc + rc_INTERNAL_LEDGER_ERROR;  // RETURN
    }

    d_isOpen = true;

    return rc_SUCCESS;
}

int IncoreClusterStateLedger::close()
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));

    enum RcEnum {
        // Value for the various RC error categories
        rc_SUCCESS = 0  // Success
        ,
        rc_NOT_OPENED = -1  // CSL is not opened
        ,
        rc_CLOSE_FAILURE = -2  // Failure to close ledger
    };

    if (!d_isOpen) {
        return rc_NOT_OPENED;  // RETURN
    }

    cancelUncommittedAdvisories();

    int rc = d_ledger_mp->close();
    if (rc != 0) {
        return 10 * rc + rc_CLOSE_FAILURE;  // RETURN
    }

    d_isOpen = false;

    return rc_SUCCESS;
}

int IncoreClusterStateLedger::apply(
    const bmqp_ctrlmsg::PartitionPrimaryAdvisory& advisory)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));
    BSLS_ASSERT_SAFE(isSelfLeader());

    bmqp_ctrlmsg::ClusterMessage clusterMessage;
    clusterMessage.choice().makePartitionPrimaryAdvisory(advisory);

    return applyAdvisoryInternal(clusterMessage,
                                 advisory.sequenceNumber(),
                                 ClusterStateRecordType::e_UPDATE);
}

int IncoreClusterStateLedger::apply(
    const bmqp_ctrlmsg::QueueAssignmentAdvisory& advisory)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));
    BSLS_ASSERT_SAFE(isSelfLeader());

    bmqp_ctrlmsg::ClusterMessage clusterMessage;
    clusterMessage.choice().makeQueueAssignmentAdvisory(advisory);

    return applyAdvisoryInternal(clusterMessage,
                                 advisory.sequenceNumber(),
                                 ClusterStateRecordType::e_UPDATE);
}

int IncoreClusterStateLedger::apply(
    const bmqp_ctrlmsg::QueueUnassignedAdvisory& advisory)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));
    BSLS_ASSERT_SAFE(isSelfLeader());

    bmqp_ctrlmsg::ClusterMessage clusterMessage;
    clusterMessage.choice().makeQueueUnassignedAdvisory(advisory);

    return applyAdvisoryInternal(clusterMessage,
                                 advisory.sequenceNumber(),
                                 ClusterStateRecordType::e_UPDATE);
}

int IncoreClusterStateLedger::apply(
    const bmqp_ctrlmsg::QueueUpdateAdvisory& advisory)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));
    BSLS_ASSERT_SAFE(isSelfLeader());

    bmqp_ctrlmsg::ClusterMessage clusterMessage;
    clusterMessage.choice().makeQueueUpdateAdvisory(advisory);

    return applyAdvisoryInternal(clusterMessage,
                                 advisory.sequenceNumber(),
                                 ClusterStateRecordType::e_UPDATE);
}

int IncoreClusterStateLedger::apply(
    const bmqp_ctrlmsg::LeaderAdvisory& advisory)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));
    BSLS_ASSERT_SAFE(isSelfLeader());

    bmqp_ctrlmsg::ClusterMessage clusterMessage;
    clusterMessage.choice().makeLeaderAdvisory(advisory);

    return applyAdvisoryInternal(clusterMessage,
                                 advisory.sequenceNumber(),
                                 ClusterStateRecordType::e_SNAPSHOT);
}

int IncoreClusterStateLedger::apply(
    const bmqp_ctrlmsg::ClusterMessage& clusterMessage)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));
    BSLS_ASSERT_SAFE(isSelfLeader());

    const bmqp_ctrlmsg::ClusterMessageChoice& choice = clusterMessage.choice();

    typedef bmqp_ctrlmsg::ClusterMessageChoice MsgChoice;
    switch (choice.selectionId()) {
    case MsgChoice::SELECTION_ID_PARTITION_PRIMARY_ADVISORY: {
        return apply(choice.partitionPrimaryAdvisory());  // RETURN
    }
    case MsgChoice::SELECTION_ID_LEADER_ADVISORY: {
        return apply(choice.leaderAdvisory());  // RETURN
    }
    case MsgChoice::SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY: {
        return apply(choice.queueAssignmentAdvisory());  // RETURN
    }
    case MsgChoice::SELECTION_ID_QUEUE_UNASSIGNED_ADVISORY: {
        return apply(choice.queueUnassignedAdvisory());  // RETURN
    }
    case MsgChoice::SELECTION_ID_QUEUE_UPDATE_ADVISORY: {
        return apply(choice.queueUpdateAdvisory());  // RETURN
    }
    case MsgChoice::SELECTION_ID_UNDEFINED:
    default: {
        BSLS_ASSERT_SAFE(
            false &&
            "Unsupported cluster message type for cluster state ledger");
        return -1;  // RETURN
    }
    }

    BSLS_ASSERT_OPT(false && "Unreachable by design.");
    return -1;
}

int IncoreClusterStateLedger::apply(const bdlbb::Blob&   event,
                                    mqbnet::ClusterNode* source)
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));

    return applyImpl(event, source,
                     false);  // delayed
}

void IncoreClusterStateLedger::setIsFirstLeaderAdvisory(
    bool isFirstLeaderAdvisory)
{
    d_isFirstLeaderAdvisory = isFirstLeaderAdvisory;
}

// ACCESSORS
//   (virtual mqbc::ClusterStateLedger)
bslma::ManagedPtr<ClusterStateLedgerIterator>
IncoreClusterStateLedger::getIterator() const
{
    // executed by the *CLUSTER DISPATCHER* thread

    // PRECONDITIONS
    BSLS_ASSERT_SAFE(
        d_clusterData_p->cluster()->dispatcher()->inDispatcherThread(
            d_clusterData_p->cluster()));

    bslma::ManagedPtr<ClusterStateLedgerIterator> mp(
        new (*d_allocator_p)
            IncoreClusterStateLedgerIterator(d_ledger_mp.get()),
        d_allocator_p);

    return mp;
}

}  // close package namespace
}  // close enterprise namespace
